package com.gitee.jastee.kafka.group;

import cn.hutool.core.date.DateUtil;
import com.gitee.jastee.kafka.producer.KafkaProducerClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * 基于时间修改Consumer Offset
 * @author jast
 * @date 2020/4/27 9:49
 */
public class SetConsumerGroupOffsetBasedOnTime {

    /**
     * Kafka消费者配置
     * @param brokerList
     * @param groupID
     * @return
     */
    private static Properties getProperties(String brokerList, String groupID) {
        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);//控制每次poll的数量
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        return consumerProperties;
    }

    public static void main(String[] args) throws InterruptedException {
        testData();

        String brokerList = "192.168.2.234:9092";
        String groupID = "test-consumer";
        String topic = "test";  // 要重设位移的 Kafka 主题

    }


    /**
     * 指定消费指定时间数据
     * @author Jast
     * @param brokerList 代理列表
     * @param groupID    组id
     * @param topic      主题
     */
    public static void setDateTime(String brokerList,String groupID,String topic){
        Properties consumerProperties = getProperties(brokerList, groupID);

        //将offset 设置为 重设位移到 2020 年 4 月 27 日 14 点 14分 0秒 的数据
        try (final KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Collections.singleton(topic));
            consumer.poll(0);
            //设置指定时间
            long ts = LocalDateTime.of(
                    2020, 4, 27, 14, 14,00).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
            Map<TopicPartition, Long> timeToSearch =
                    consumer.partitionsFor(topic).stream().map(info ->
                            new TopicPartition(topic, info.partition()))
                            .collect(Collectors.toMap(Function.identity(), tp -> ts));
            for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
                    consumer.offsetsForTimes(timeToSearch).entrySet()) {
                consumer.seek(entry.getKey(), entry.getValue().offset());
                consumer.commitSync();
            }
        }
    }


    /**
     * 将位移调回 30 分钟前
     * @param brokerList
     * @param groupID
     * @param topic
     */
    public static void setDuration (String brokerList,String groupID,String topic){
        Properties consumerProperties = getProperties(brokerList, groupID);

        //将offset 设置为 重设位移到 2020 年 4 月 27 日 14 点 14分 0秒 的数据
        try (final KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Collections.singleton(topic));
            consumer.poll(0);
            Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
                    .map(info -> new TopicPartition(topic, info.partition()))
                    .collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000  * 60));

            for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
                    consumer.offsetsForTimes(timeToSearch).entrySet()) {
                consumer.seek(entry.getKey(), entry.getValue().offset());
                consumer.commitSync();
            }
        }
    }

    /**
     * 生产测试数据
     * @throws InterruptedException
     */
    public static void testData() throws InterruptedException {
        String topic = "test";
        KafkaProducerClient producerClient = new KafkaProducerClient();
        Producer<String, String> producer = producerClient.getProducer();
        while(true){

            String data = DateUtil.date().toString();
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, data);
//            producer.send(record);
            Future<RecordMetadata> future = producer.send(record);
//            System.out.println(data);
            Thread.sleep(10);
        }
    }


}
